{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Example of Save Image Data as a Feature Group in the Feature Store\n",
    "\n",
    "Often, image data can be fed in as raw data to deep learning models and requires less feature engineering than other type of data. Thus, in many cases you would **not** need need to store image data as a feature group in the feature store, but rather you would save it directly as a training dataset in for example .tfrecords or .petastorm format.\n",
    "\n",
    "However, sometimes you want to join image features with other types of features and you might also need to do feature engineering steps such as *data augmentation, image scaling, image normalization etc.*. This notebook will show you how you can save image data as a feature group in the feature store."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Starting Spark application\n"
     ]
    },
    {
     "data": {
      "text/html": [
       "<table>\n",
       "<tr><th>ID</th><th>YARN Application ID</th><th>Kind</th><th>State</th><th>Spark UI</th><th>Driver log</th><th>Current session?</th></tr><tr><td>9</td><td>application_1550835076939_0011</td><td>pyspark</td><td>idle</td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8088/proxy/application_1550835076939_0011/\">Link</a></td><td><a target=\"_blank\" href=\"http://hopsworks0.logicalclocks.com:8042/node/containerlogs/container_e01_1550835076939_0011_01_000001/demo_featurestore_admin000__meb10000\">Link</a></td><td>✔</td></tr></table>"
      ],
      "text/plain": [
       "<IPython.core.display.HTML object>"
      ]
     },
     "metadata": {},
     "output_type": "display_data"
    },
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "SparkSession available as 'spark'.\n"
     ]
    }
   ],
   "source": [
    "from hops import featurestore\n",
    "from hops import hdfs"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 1: Read in the Raw Image Data\n",
    "\n",
    "You can read in the image data from HopsFS using for example Spark or Tensorflow. In this example we will use Spark to read in a batch of images stored in the folder `mnist/` inside your project"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "image_dir = hdfs.project_path() + \"mnist\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "['hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/mnist/img_1.jpg', 'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/mnist/img_108.jpg', 'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/mnist/img_17.jpg', 'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/mnist/img_23.jpg', 'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/mnist/img_4.jpg', 'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/mnist/img_5.jpg', 'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/mnist/img_54.jpg', 'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/mnist/img_63.jpg', 'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/mnist/img_69.jpg', 'hdfs://10.0.2.15:8020/Projects/demo_featurestore_admin000/mnist/img_98.jpg']"
     ]
    }
   ],
   "source": [
    "hdfs.ls(image_dir)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "image_df = spark.read.format(\"image\").load(image_dir)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 2: Process The Images (Feature Engineering)\n",
    "\n",
    "After having read the images using for example Spark or Tensorflow you can do feature engineering as you like with the images before you save them to the feature store."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "#image_df = image_df.map()...."
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Step 3: Saving The Processed Images to the Feature Store as a Feature Group\n",
    "\n",
    "To save the images to the feature store as a feature group you can store them in the format that Spark automatically structures images:\n",
    "\n",
    "```\n",
    "root\n",
    " |-- image: struct (nullable = true)\n",
    " |    |-- origin: string (nullable = true)\n",
    " |    |-- height: integer (nullable = true)\n",
    " |    |-- width: integer (nullable = true)\n",
    " |    |-- nChannels: integer (nullable = true)\n",
    " |    |-- mode: integer (nullable = true)\n",
    " |    |-- data: binary (nullable = true)\n",
    "```\n",
    "Or you can setup your own custom format for storing the images (for example flattening each image to a float array)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "computing descriptive statistics for : mnist_images_featuregroup\n",
      "Running sql: use demo_featurestore_admin000_featurestore"
     ]
    }
   ],
   "source": [
    "featurestore.create_featuregroup(image_df, \"mnist_images_featuregroup\", \n",
    "                                 feature_correlation=False, \n",
    "                                 cluster_analysis=False,\n",
    "                                feature_histograms=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore\n",
      "Running sql: SELECT * FROM mnist_images_featuregroup_1"
     ]
    }
   ],
   "source": [
    "image_fg = featurestore.get_featuregroup(\"mnist_images_featuregroup\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+\n",
      "|               image|\n",
      "+--------------------+\n",
      "|[hdfs://10.0.2.15...|\n",
      "|[hdfs://10.0.2.15...|\n",
      "|[hdfs://10.0.2.15...|\n",
      "|[hdfs://10.0.2.15...|\n",
      "|[hdfs://10.0.2.15...|\n",
      "+--------------------+\n",
      "only showing top 5 rows"
     ]
    }
   ],
   "source": [
    "image_fg.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- image: struct (nullable = true)\n",
      " |    |-- origin: string (nullable = true)\n",
      " |    |-- height: integer (nullable = true)\n",
      " |    |-- width: integer (nullable = true)\n",
      " |    |-- nChannels: integer (nullable = true)\n",
      " |    |-- mode: integer (nullable = true)\n",
      " |    |-- data: binary (nullable = true)"
     ]
    }
   ],
   "source": [
    "image_fg.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Example of Saving a Feature Group of Images as a Petastorm Training Dataset for Deep Learning"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 37,
   "metadata": {},
   "outputs": [],
   "source": [
    "from petastorm.codecs import ScalarCodec, NdarrayCodec\n",
    "from petastorm.unischema import Unischema, UnischemaField\n",
    "from petastorm import make_reader\n",
    "import numpy as np\n",
    "from petastorm.unischema import dict_to_spark_row, Unischema, UnischemaField\n",
    "from petastorm.tf_utils import tf_tensors, make_petastorm_dataset\n",
    "import tensorflow as tf"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Read Featuregroup with Images"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Running sql: use demo_featurestore_admin000_featurestore\n",
      "Running sql: SELECT * FROM mnist_images_featuregroup_1"
     ]
    }
   ],
   "source": [
    "image_fg = featurestore.get_featuregroup(\"mnist_images_featuregroup\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Inspect the Dimensions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Row(nChannels=1, height=28, width=28)"
     ]
    }
   ],
   "source": [
    "image_fg.select([\"image.nChannels\", \"image.height\", \"image.width\"]).first()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Drop Metadata Columns as Metadata Will be stored in the Petastorm Schema Instead"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [],
   "source": [
    "image_fg_binary = image_fg.withColumn(\"img\", image_fg.image.data).drop(\"image\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- img: binary (nullable = true)"
     ]
    }
   ],
   "source": [
    "image_fg_binary.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Convert Binary Byte Arrays to Numpy Arrays (Petastorm schema uses numpy datatypes)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [],
   "source": [
    "# If we had labels to the images we could have joined the dataframe with the labels and added it as a field in the schema.\n",
    "ImageSchema = Unischema('ImageSchema', [\n",
    "    UnischemaField('img', np.uint8, (28,28), NdarrayCodec(), False)\n",
    "])\n",
    "\n",
    "def create_dict(row):\n",
    "    return {\n",
    "    \"img\": np.array(row.img).reshape(28,28)\n",
    "    }"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_compatible_with_petastorm_schema = spark.createDataFrame(\n",
    "                                      image_fg_binary.rdd.map(create_dict)\\\n",
    "                                      .map(lambda x: dict_to_spark_row(ImageSchema, x))\n",
    "                                      , ImageSchema.as_spark_schema())"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Save Petastorm Training Dataset"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [],
   "source": [
    "petastorm_args = {\"schema\": ImageSchema}\n",
    "featurestore.create_training_dataset(df_compatible_with_petastorm_schema, \"image_test_petastorm_from_fg\", \n",
    "                                     data_format=\"petastorm\", petastorm_args=petastorm_args,\n",
    "                                    descriptive_statistics=False, feature_correlation=False,\n",
    "                                    feature_histograms=False, cluster_analysis=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "#### Read a Petastorm Image Dataset using Tensorflow"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [],
   "source": [
    "def tensorflow_read_td(training_dataset):\n",
    "    OUTPUT_URL = featurestore.get_training_dataset_path(training_dataset)\n",
    "    # Example: use tf.data.Dataset API\n",
    "    with make_reader(OUTPUT_URL, hdfs_driver='libhdfs') as reader:\n",
    "        dataset = make_petastorm_dataset(reader)\n",
    "        iterator = dataset.make_one_shot_iterator()\n",
    "        tensor = iterator.get_next()\n",
    "        with tf.Session() as sess:\n",
    "            sample = sess.run(tensor)\n",
    "            print(sample)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "ImageSchema_view(img=array([[  0,   0,   0,   0,   0,   0,   0,   0,   4,   0,   0,   0,   0,\n",
      "          0,   6,   0,   5,   0,   0,   0,   0,   0,   0,   1,   0,   0,\n",
      "          0,   0],\n",
      "       [  0,   0,   0,   0,   0,   0,   0,   0,   0,   3,  12,   0,  15,\n",
      "          0,   0,   0,   0,   4,   5,   0,   0,   8,  10,   0,   0,   0,\n",
      "          0,   0],\n",
      "       [  0,   0,   0,   0,   0,   0,   0,   0,   9,   0,   0,   0,   4,\n",
      "          3,   0,   6,   3,   0,   0,  10,   6,   0,   0,   0,   0,   0,\n",
      "          0,   0],\n",
      "       [  0,   0,   0,   0,   0,   0,   0,   0,   0,  10,   2,   9,   0,\n",
      "         12,   4,   0,   0,   7,   1,   0,   0,   6,  15,   9,   0,   0,\n",
      "          0,   0],\n",
      "       [  0,   0,   0,   0,   0,   0,   0,   0,   4,   2,   0,  16,  60,\n",
      "        139, 138,  76,   0,   0,   1,   6,   5,   0,   0,   0,   0,   0,\n",
      "          0,   0],\n",
      "       [  0,   0,   0,   0,   0,   0,   0,   0,   0,   1,  14,  72, 255,\n",
      "        255, 233, 241,   0,   0,   3,   4,   1,   0,   7,  16,   0,   0,\n",
      "          0,   0],\n",
      "       [  0,   0,   0,   0,   0,   0,   0,   0,   4,   5, 166, 255, 254,\n",
      "        230, 255, 240, 147, 174, 146,  59,   3,   5,   2,   0,   0,   0,\n",
      "          0,   0],\n",
      "       [  0,   0,   0,   0,   0,   0,   0,   0,  32, 193, 252, 255, 237,\n",
      "        255, 241, 255, 255, 242, 255, 255, 181,  50,   0,  10,   0,   0,\n",
      "          0,   0],\n",
      "       [  2,   0,   6,   0,   0,   6,   0,  46, 159, 255, 241, 246, 255,\n",
      "        255, 231, 255, 252, 238, 255, 249, 247, 222,  25,   2,   6,   2,\n",
      "          0,   0],\n",
      "       [  1,   0,   0,   0,   0,   3,   0, 113, 250, 251, 255, 223, 227,\n",
      "        250, 255, 240, 254, 255, 239, 255, 240, 255,  76,   6,   0,   1,\n",
      "          0,   9],\n",
      "       [  1,   0,   0,   2,   6,   0,  36, 199, 255, 247, 248, 148,  64,\n",
      "        241, 252, 253, 115, 175, 255, 255, 245, 253, 188,   0,   0,   6,\n",
      "          0,   9],\n",
      "       [  2,   0,   0,   3,   4,   0, 113, 251, 252, 252, 144,   0,   0,\n",
      "        208, 248, 229,   5,   7, 105, 233, 255, 249, 227, 120,   5,  10,\n",
      "          0,   1],\n",
      "       [  3,   0,   0,   3,   0,  16, 199, 255, 253, 252,  19,   6,   0,\n",
      "        172, 242, 245,  94,  22,  17,  54, 255, 255, 252, 234,   7,   0,\n",
      "          0,   1],\n",
      "       [  4,   0,   4,   3,   0,  89, 253, 248, 252,  89,   7,   0,   0,\n",
      "         16,  98, 133, 116, 102,   0,   0,  89, 255, 244, 255,  39,   0,\n",
      "          6,   8],\n",
      "       [  2,   0,   5,   3,   0, 189, 255, 246, 245,  18,   8,   6,   0,\n",
      "          0,   0,   1,  23,   5,   0,   0,   4, 159, 247, 255, 142,   0,\n",
      "          1,   5],\n",
      "       [  1,   0,   3,   4,   0, 255, 249, 251,  92,   0,   0,   7,   8,\n",
      "          0,  22,   0,   0,   6,   0,   6,   0, 162, 255, 245, 249,  17,\n",
      "          0,   0],\n",
      "       [  0,   0,   2,   0,   4, 244, 251, 250,  50,   9,   0,   2,   1,\n",
      "          0,   0,   7,   0,   3,   0,   8,   0,  66, 244, 243, 255,   6,\n",
      "          0,   0],\n",
      "       [  2,   0,   6,  11,   0, 255, 252, 255,  34,   0,  13,   0,   0,\n",
      "          7,  10,   0,   8,  13,   0,   0,   5,  92, 226, 255, 189,  14,\n",
      "         11,   0],\n",
      "       [  8,   0,   0,  16,   0, 250, 255, 244, 196,  18,   1,   0,  19,\n",
      "          0,   0,   7,   0,   6,   1,  12,   4, 180, 255, 241,  53,   0,\n",
      "          0,   0],\n",
      "       [  2,   3,   0,   0,   6, 132, 248, 255, 255, 130,  82,   2,   0,\n",
      "          0,   0,   0,   6,   0,  20,  68, 250, 255, 242, 162,   0,   0,\n",
      "          0,   0],\n",
      "       [  0,  10,   5,   0,   3,  18, 174, 255, 252, 255, 249, 176, 165,\n",
      "        177, 107,  58,  82, 142, 192, 255, 255, 245, 150,   0,   0,  13,\n",
      "          0,   4],\n",
      "       [  0,   0,   9,   0,   6,   0,  45, 159, 231, 255, 255, 253, 255,\n",
      "        250, 230, 255, 243, 255, 251, 255, 232, 111,  37,   0,   0,   3,\n",
      "          0,   0],\n",
      "       [  4,   0,   1,   2,  10,   6,   0,  27, 101, 191, 246, 245, 251,\n",
      "        247, 255, 253, 247, 253, 219, 167,  30,  11,   0,   0,   6,   1,\n",
      "          2,   0],\n",
      "       [  0,   6,   1,   0,   0,   0,  11,   0,   0,  15,  23,  31, 108,\n",
      "        133, 137,  17,  38,   0,  25,   0,   0,   0,   0,   1,   0,   0,\n",
      "         12,   0],\n",
      "       [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,\n",
      "          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,\n",
      "          0,   0],\n",
      "       [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,\n",
      "          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,\n",
      "          0,   0],\n",
      "       [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,\n",
      "          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,\n",
      "          0,   0],\n",
      "       [  0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,\n",
      "          0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,   0,\n",
      "          0,   0]], dtype=uint8))"
     ]
    }
   ],
   "source": [
    "tensorflow_read_td(\"image_test_petastorm_from_fg\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "",
   "name": "pysparkkernel"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "python",
    "version": 2
   },
   "mimetype": "text/x-python",
   "name": "pyspark",
   "pygments_lexer": "python2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}